package com.atguigu.gmall1118.app

import com.atguigu.gmall1118.bean.TagInfo
import com.atguigu.gmall1118.dao.TagInfoDAO
import com.atguigu.gmall1118.util.{ClickhouseUtil, MyPropertiesUtil}
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession}

import java.util.Properties


object TaskExportCk {


  //准备：clickhouse 单节点
  //
  //建立clickhouse的宽表
  //  1 手动自动 ？ 2 每天一张？ 一共一张？
  // 建表逻辑跟hive宽表大体一致
  //
  //读取画像库的标签宽表
  // java List  不行 driver单点内存 放不下
  //rdd df ds  可以
  //
  //写入到clickhouse中的宽表
  //   可以利用jdbc来实现任意支持sql数据库的读写
  def main(args: Array[String]): Unit = {

    val taskId=args(0)
    val busiDate=args(1)

    val sparkConf: SparkConf = new SparkConf().setAppName("task_export_ck_app")//.setMaster("local[*]")
    val sparkSession: SparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()


    val properties: Properties = MyPropertiesUtil.load("config.properties")
    val CLICKHOUSE_URL = properties.getProperty("clickhouse.url")

    val upDbName: String = properties.getProperty("user-profile.dbname")


    //create table  表名            可以和hive宽表一致
    //( 字段 字段类型)              可以和hive宽表一致  类型一律string
    //引擎                              MergeTree    SummingMergeTree  ReplacingMergeTree(幂等)
    //                                        批处理 不用考虑数据重复问题，如果执行失败，删掉当日数据重跑即可。
    //分区                        可以和hive宽表一致，每天一张表 不用分区
    //主键                      uid       与排序列一致可省
    //排序                      uid

    // 查询mysql获得启用的标签对象 TagInfoList
    val tagInfoList: List[TagInfo] = TagInfoDAO.getTagInfoListWithOn()
    //  建表
    //  根据启用的标签列表 来决定宽表的字段
    val tableName =s"user_tag_merge_${busiDate.replace("-","")}"

    val fieldList: List[String] = tagInfoList.map(tagInfo => s"${tagInfo.tagCode.toLowerCase} String")
    val fieldsSql: String = fieldList.mkString(",")


    val createTableSQL=
      s"""
         |create table if not exists $tableName
         |(uid String ,$fieldsSql)
         |engine=MergeTree
         |order by uid
         |""".stripMargin
    val dropTableSQL=s" drop table if exists $tableName"
    println(dropTableSQL)
    ClickhouseUtil.executeSql(dropTableSQL)
    println(createTableSQL)

    ClickhouseUtil.executeSql(createTableSQL)

    //查询hive中的宽表 读取为dataframe

    val dataFrame: DataFrame = sparkSession.sql(s"select * from $upDbName.$tableName")

    //写入到clickhouse中的宽表
    dataFrame.write.mode(SaveMode.Append).option("batchsize",1000)  //攒批写入 //并行度 //事务关闭
      .option("isolationLevel", "NONE") // 关闭事务
      .option("numPartitions", "12") // 设置并发
      .option("driver","ru.yandex.clickhouse.ClickHouseDriver")
      .jdbc(CLICKHOUSE_URL, tableName,new Properties())

  }

}
